{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**_pySpark Basics: Summary Statistics_**"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "_by Jeff Levy (jlevy@urban.org)_\n",
    "\n",
    "_Last Updated: 8 Aug 2016, Spark v2.0_"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "_Abstract: Here we will cover several common ways to summarize data.  Many of these methods have been dicussed in other tutorials in different contexts._\n",
    "\n",
    "_Main operations used: `describe`, `skewness`, `kurtosis`, `collect`, `select`_"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "***"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "First we will load the same csv data we've been using in many other tutorials, then pare it down to a manageable subset for ease of use:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "df = spark.read.csv('s3://ui-spark-social-science-public/data/Performance_2015Q1.txt', header=False, inferSchema=True, sep='|')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "df = df[['_c0', '_c2', '_c3', '_c4', '_c5', '_c6']]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------+-----+-----+----+---+---+\n",
      "|         _c0|  _c2|  _c3| _c4|_c5|_c6|\n",
      "+------------+-----+-----+----+---+---+\n",
      "|100002091588|OTHER|4.125|null|  0|360|\n",
      "|100002091588| null|4.125|null|  1|359|\n",
      "|100002091588| null|4.125|null|  2|358|\n",
      "|100002091588| null|4.125|null|  3|357|\n",
      "|100002091588| null|4.125|null|  4|356|\n",
      "+------------+-----+-----+----+---+---+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Note that the format `_c0`, `_c1`, `...`, `_cN` is the default column names Spark uses if your data doesn't come with headers.  For more on this, and renaming them, see the pySpark tutorial named *basics 1.ipynb*"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Describe\n",
    "\n",
    "The first thing we'll do is use the `describe` method to get some basics.  Note that **describe will return a new dataframe** with the parameters, so we'll assign the results to a new variable and then call `show` on it:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+--------------------+--------------------+-------------------+------------------+------------------+-----------------+\n",
      "|summary|                 _c0|                 _c2|                _c3|               _c4|               _c5|              _c6|\n",
      "+-------+--------------------+--------------------+-------------------+------------------+------------------+-----------------+\n",
      "|  count|             3526154|              382039|            3526154|           1580402|           3526154|          3526154|\n",
      "|   mean|5.503885995001908E11|                null|  4.178168090219519|234846.78065481762| 5.134865351881966|354.7084951479714|\n",
      "| stddev|2.596112361975214...|                null|0.34382335723646673|118170.68592261661|3.3833930336063465| 4.01181251079202|\n",
      "|    min|        100002091588|  CITIMORTGAGE, INC.|               2.75|              0.85|                -1|              292|\n",
      "|    max|        999995696635|WELLS FARGO BANK,...|              6.125|        1193544.39|                34|              480|\n",
      "+-------+--------------------+--------------------+-------------------+------------------+------------------+-----------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df_described = df.describe()\n",
    "df_described.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Aside from the five included in `describe`, there are a handful of other built-in aggregators that can be applied to a column.  Here we'll apply the `skewness` function to column `_c3`:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------------+\n",
      "|     skewness(_c3)|\n",
      "+------------------+\n",
      "|0.5197993394959904|\n",
      "+------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from pyspark.sql.functions import skewness, kurtosis\n",
    "from pyspark.sql.functions import var_pop, var_samp, stddev, stddev_pop, sumDistinct, ntile\n",
    "df.select(skewness('_c3')).show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Expanding the Describe Output\n",
    "\n",
    "One convenient thing we might want to do is put all our summary statistics together in one spot - in essence, expand the output from `describe`.  Below I'll go into a short example:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[Row(_c0='-0.00183847089866', _c2='None', _c3='0.519799339496', _c4='0.758411576756', _c5='0.286480156084', _c6='-2.69765201567', summary='skew'), Row(_c0='-1.19900726351', _c2='None', _c3='0.126057726847', _c4='0.576085602656', _c5='0.195187780089', _c6='24.7237858944', summary='kurtosis')]\n"
     ]
    }
   ],
   "source": [
    "from pyspark.sql import Row\n",
    "\n",
    "columns = df_described.columns  #list of column names: ['summary', '_c0', '_c3', '_c4', '_c5', '_c6']\n",
    "funcs   = [skewness, kurtosis]  #list of functions we want to include (imported earlier)\n",
    "fnames  = ['skew', 'kurtosis']  #a list of strings describing the functions in the same order\n",
    "\n",
    "def new_item(func, column):\n",
    "    \"\"\"\n",
    "    This function takes in an aggregation function and a column name, then applies the aggregation to the\n",
    "    column, collects it and returns a value.  The value is in string format despite being a number, \n",
    "    because that matches the output of describe.\n",
    "    \"\"\"\n",
    "    return str(df.select(func(column)).collect()[0][0])\n",
    "\n",
    "new_data = []\n",
    "for func, fname in zip(funcs, fnames):\n",
    "    row_dict = {'summary':fname}  #each row object begins with an entry for \"summary\"\n",
    "    for column in columns[1:]:\n",
    "        row_dict[column] = new_item(func, column)\n",
    "    new_data.append(Row(**row_dict))  #using ** tells Python to unpack the entries of the dictionary\n",
    "    \n",
    "print(new_data)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This code iterates through the entries in `funcs` and `fnames` together, then builds a new row object following the format of the standard `describe` output.  You can see from the output that it looks nearly identical to the output of `collect` when applied to a dataframe:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(summary=u'count', _c0=u'3526154', _c2=u'382039', _c3=u'3526154', _c4=u'1580402', _c5=u'3526154', _c6=u'3526154'),\n",
       " Row(summary=u'mean', _c0=u'5.503885995001908E11', _c2=None, _c3=u'4.178168090219519', _c4=u'234846.78065481762', _c5=u'5.134865351881966', _c6=u'354.7084951479714'),\n",
       " Row(summary=u'stddev', _c0=u'2.5961123619752148E11', _c2=None, _c3=u'0.34382335723646673', _c4=u'118170.68592261661', _c5=u'3.3833930336063465', _c6=u'4.01181251079202'),\n",
       " Row(summary=u'min', _c0=u'100002091588', _c2=u'CITIMORTGAGE, INC.', _c3=u'2.75', _c4=u'0.85', _c5=u'-1', _c6=u'292'),\n",
       " Row(summary=u'max', _c0=u'999995696635', _c2=u'WELLS FARGO BANK, N.A.', _c3=u'6.125', _c4=u'1193544.39', _c5=u'34', _c6=u'480')]"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df_described.collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Although the columns are out of order within the rows; this is because we built them from a dictionary, and dictionary entries in Python are inherently unordered.  We will fix that below.\n",
    "\n",
    "The next step is to join the two sets of data into one, in order to make a modified `describe` output that includes skew and kurtosis.  The same method could be used to include any other aggregations desired."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------+--------------------+--------------------+-------------------+------------------+------------------+-----------------+\n",
      "| summary|                 _c0|                 _c2|                _c3|               _c4|               _c5|              _c6|\n",
      "+--------+--------------------+--------------------+-------------------+------------------+------------------+-----------------+\n",
      "|   count|             3526154|              382039|            3526154|           1580402|           3526154|          3526154|\n",
      "|    mean|5.503885995001908E11|                null|  4.178168090219519|234846.78065481762| 5.134865351881966|354.7084951479714|\n",
      "|  stddev|2.596112361975214...|                null|0.34382335723646673|118170.68592261661|3.3833930336063465| 4.01181251079202|\n",
      "|     min|        100002091588|  CITIMORTGAGE, INC.|               2.75|              0.85|                -1|              292|\n",
      "|     max|        999995696635|WELLS FARGO BANK,...|              6.125|        1193544.39|                34|              480|\n",
      "|    skew|   -0.00183847089866|                None|     0.519799339496|    0.758411576756|    0.286480156084|   -2.69765201567|\n",
      "|kurtosis|      -1.19900726351|                None|     0.126057726847|    0.576085602656|    0.195187780089|    24.7237858944|\n",
      "+--------+--------------------+--------------------+-------------------+------------------+------------------+-----------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "new_describe = sc.parallelize(new_data).toDF()           #turns the results from our loop into a dataframe\n",
    "new_describe = new_describe.select(df_described.columns) #forces the columns into the same order\n",
    "\n",
    "expanded_describe = df_described.unionAll(new_describe)  #merges the new stats with the original describe\n",
    "expanded_describe.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "And now we have our expanded `describe` output."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.12"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
